package com.ruoyi.framework.websocket;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.ruoyi.common.core.domain.model.LoginUser;
import com.ruoyi.framework.web.service.TokenService;
import com.ruoyi.system.domain.SysCompanyThingsboard;
import com.ruoyi.system.service.ISysCompanyThingsboardService;
import com.ruoyi.system.service.IThingsBoardTenantService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Semaphore;

/**
 * websocket 消息处理
 * 
 * @author ruoyi
 */
@Slf4j
@Component
@ServerEndpoint("/ws/tb_duplex")
public class TbDuplexWsServer
{
    private static TokenService tokenService;
    private static ISysCompanyThingsboardService sysCompanyThingsboardService;
    private static IThingsBoardTenantService thingsBoardTenantService;

    @Autowired
    public void setTokenService(TokenService tokenService) {
        TbDuplexWsServer.tokenService = tokenService;
    }

    @Autowired
    public void setSysCompanyThingsboardService(ISysCompanyThingsboardService sysCompanyThingsboardService) {
        TbDuplexWsServer.sysCompanyThingsboardService = sysCompanyThingsboardService;
    }

    @Autowired
    public void setThingsBoardTenantService(IThingsBoardTenantService thingsBoardTenantService) {
        TbDuplexWsServer.thingsBoardTenantService = thingsBoardTenantService;
    }

    public static int socketMaxOnlineCount = 100;
    private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);
    private static Map<String, TbWsClient> tbWsClientMap = new HashMap<>();
    private static Map<String, LoginUser> loginUserMap = new HashMap<>();

    @OnOpen
    public void onOpen(Session session) throws Exception {
        boolean semaphoreFlag = false;
        // 尝试获取信号量
        semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);
        if (!semaphoreFlag) {
            // 未获取到信号量
            log.error("\n 当前在线人数超过限制数- {}", socketMaxOnlineCount);
            WebSocketUsers.sendMessageToUserByText(session, "当前在线人数超过限制数：" + socketMaxOnlineCount);
            session.close();
        } else {
            // 添加用户
            WebSocketUsers.put(session.getId(), session);
            log.info("\n 建立连接 - {}", session);
            log.info("\n 当前人数 - {}", WebSocketUsers.getUsers().size());
            WebSocketUsers.sendMessageToUserByText(session, "连接成功");
        }
    }

    @OnClose
    public void onClose(Session session) {
        log.info("\n 关闭连接 - {}", session);
        // 移除用户
        WebSocketUsers.remove(session.getId());
        loginUserMap.remove(session.getId());
        try {
            TbWsClient client = tbWsClientMap.remove(session.getId());
            if (client != null) {
                client.getSession().close();
            }
        } catch (Exception e) {
            log.info("",e);
            StackTraceElement[] traces = e.getStackTrace();
            for (StackTraceElement trace : traces) {
                log.debug(trace.toString());
            }
        }
        // 获取到信号量则需释放
        SemaphoreUtils.release(socketSemaphore);
    }

    @OnError
    public void onError(Session session, Throwable exception) throws Exception {
        if (session.isOpen()) {
            // 关闭连接
            session.close();
        }
        String sessionId = session.getId();
        log.info("\n 连接异常 - {}", sessionId);
        log.info("\n 异常信息 - {}", exception);
        // 移出用户
        WebSocketUsers.remove(sessionId);
        // 获取到信号量则需释放
        SemaphoreUtils.release(socketSemaphore);
    }

    private Boolean isLoggedIn(Session session) {
        LoginUser user = loginUserMap.get(session.getId());
        return (user != null);
    }

    private Boolean logIn(Session session, JSONObject request) {
        Boolean done = false;
        String command = request.getString("command");
        if (command.equals("login")) {
            JSONObject params = request.getJSONObject("params");
            if (params != null) {
                String token = params.getString("token");
                if (token != null) {
                    LoginUser user = tokenService.getLoginUser(token);
                    if (user != null) {
                        loginUserMap.put(session.getId(), user);
                        done = true;
                    }
                }    
            }
        }
        return done;
    }

    private Boolean isTbWsClientReady(Session session) {
        return tbWsClientMap.containsKey(session.getId());
    }

    private Boolean buildTbWsClient(Session session) {
        Boolean done = false;
        try {
            LoginUser user = loginUserMap.get(session.getId());
            SysCompanyThingsboard sysCompanyThingsboard = sysCompanyThingsboardService.selectSysCompanyThingsboardByUser(user.getUser());
            String url = thingsBoardTenantService.getWsUrl(sysCompanyThingsboard);
            log.info("tb ws url: " + url);
            WebSocketContainer container = ContainerProvider.getWebSocketContainer();
            TbWsClient client = new TbWsClient();
            container.connectToServer(client, new URI(url));
            client.setDownStream(session);
            tbWsClientMap.put(session.getId(), client);
            done = true;
        } catch (Exception e) {
            log.info("",e);
            StackTraceElement[] traces = e.getStackTrace();
            for (StackTraceElement trace : traces) {
                log.debug(trace.toString());
            }
        }
        return done;
    }

    private void sendTbWsMessage(Session session, String message) {
        TbWsClient client = tbWsClientMap.get(session.getId());
        if (client == null) {
            return;
        }
        client.getSession().getAsyncRemote().sendText(message);
    }

    private void subscribe(Session session, JSONObject request) {
        JSONObject params = request.getJSONObject("params");
        if (params == null) {
            fail(session, "请求格式错误");
            return;
        }
        String deviceId = params.getString("deviceId");
        if (deviceId == null) {
            fail(session, "请求格式错误");
            return;
        }
        JSONObject command = new JSONObject();
        JSONArray cmds = new JSONArray();
        JSONObject cmd = new JSONObject();
        cmd.put("entityType", "DEVICE");
        cmd.put("entityId", deviceId);
        cmd.put("scope", "LATEST_TELEMETRY");
        Random random = new Random();
        cmd.put("cmdId", random.nextInt(16));
        cmds.add(cmd);
        command.put("tsSubCmds", cmds);
        command.put("historyCmds", new JSONArray());
        command.put("attrSubCmds", new JSONArray());
        sendTbWsMessage(session, command.toJSONString());
    }

    private String rpc(Session session, JSONObject request) {
        JSONObject params = request.getJSONObject("params");
        if (params == null) {
            fail(session, "请求格式错误");
            return "请求格式错误";
        }
        String deviceId = params.getString("deviceId");
        if (deviceId == null) {
            fail(session, "请求格式错误");
            return "请求格式错误";
        }
        JSONObject requestBody = params.getJSONObject("requestBody");;
        if (requestBody == null) {
            fail(session, "请求格式错误");
            return "请求格式错误";
        }
        LoginUser user = loginUserMap.get(session.getId());
        SysCompanyThingsboard sysCompanyThingsboard = sysCompanyThingsboardService.selectSysCompanyThingsboardByUser(user.getUser());
       return thingsBoardTenantService.sendRpcRequest(deviceId, requestBody, sysCompanyThingsboard);
    }

    private void success(Session session, JSONObject data, String message) {
        JSONObject result = new JSONObject();
        result.put("error", false);
        result.put("data", data);
        result.put("message", message);
        WebSocketUsers.sendMessageToUserByText(session, result.toJSONString());
    }

    private void fail(Session session, String message) {
        JSONObject result = new JSONObject();
        result.put("error", true);
        result.put("message", message);
        WebSocketUsers.sendMessageToUserByText(session, result.toJSONString());
    }

    @OnMessage
    public void onMessage(String message, Session session) {
        JSONObject request = JSON.parseObject(message);
        if (request == null || request.getString("command") == null) {
            fail(session, "请求格式错误");
            return;
        }
        if (!isLoggedIn(session)) {
            if (!logIn(session, request)) {
                try {
                    session.close();
                } catch(Exception e) {}
                return;
            }
        }
        if (!isTbWsClientReady(session)) {
            if (!buildTbWsClient(session)) {
                fail(session, "物联平台websocket连接失败");
                return;
            }
            success(session, null, "物联平台websocket连接成功");
        }
        String command = request.getString("command");
        if (command.equals("subscribe")) {
            subscribe(session, request);
        } else if (command.equals("rpc")) {
            String r = rpc(session, request);
            JSONObject result = new JSONObject();
            result.put("message", r);
            WebSocketUsers.sendMessageToUserByText(session, result.toJSONString());
        }
    }
}
